package com.cloudansys.core.flink.function;

import com.cloudansys.core.entity.MultiDataEntity;
import com.cloudansys.core.util.NumUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.*;

@Slf4j
public class RemoveMaxProcessor extends ProcessWindowFunction<List<MultiDataEntity>, List<MultiDataEntity>, String, TimeWindow> {

    /**
     * 1 秒内去除一个最大值（xy绝对值之和最大）然后返回一个最大值
     * 计算倾斜时，需要判断倾角仪的xy的绝对值是否都小于 1°，否则无效不参加计算，计算选用的倾角仪xy绝对值之和最大
     */
    @Override
    public void process(String k, Context context, Iterable<List<MultiDataEntity>> elements, Collector<List<MultiDataEntity>> out) throws Exception {
        // eleMap 的 Key 为：projectId + serialCode  Value 的 key 为：xy绝对值之和，v 为 x,y
        Map<String, List<List<Double>>> valMap = new HashMap<>();
        Map<String, MultiDataEntity> objMap = new HashMap<>();
        for (List<MultiDataEntity> element : elements) {
            for (MultiDataEntity multiDataEntity : element) {
                String projectId = multiDataEntity.getProjectId();
                String serialCode = multiDataEntity.getSerialCode();
                String key = projectId + serialCode;
                Double[] values = multiDataEntity.getValues();
                Double x = values[2];
                Double y = values[3];
                List<Double> vList = new ArrayList<>();
                vList.add(x);
                vList.add(y);
                if (valMap.containsKey(key)) {
                    valMap.get(key).add(vList);
                } else {
                    List<List<Double>> vLists = new ArrayList<>();
                    vLists.add(vList);
                    valMap.put(key, vLists);
                }
                if (!objMap.containsKey(key)) {
                    objMap.put(key, multiDataEntity);
                }
            }
        }
        for (String key : valMap.keySet()) {
            List<List<Double>> vLists = valMap.get(key);
            if (vLists.size() == 0) {
                continue;
            }
            double maxValue = 0;
            int indexOfMax = 0;
            // 找到最大值的索引
            for (int i = 0; i < vLists.size(); i++) {
                List<Double> vList = vLists.get(i);
                Double x = vList.get(0);
                Double y = vList.get(1);
                double sumOfAbs = NumUtil.getSumOfAbs(x, y);
                if (sumOfAbs > maxValue) {
                    maxValue = sumOfAbs;
                    indexOfMax = i;
                }
            }
            // 去除最大值
            vLists.remove(indexOfMax);
            if (vLists.size() != 0){
                // 然后再返回剩余的最大值
                maxValue = 0;
                indexOfMax = 0;
                for (int i = 0; i < vLists.size(); i++) {
                    List<Double> vList = vLists.get(i);
                    Double x = vList.get(0);
                    Double y = vList.get(1);
                    double sumOfAbs = NumUtil.getSumOfAbs(x, y);
                    if (sumOfAbs > maxValue) {
                        maxValue = sumOfAbs;
                        indexOfMax = i;
                    }
                }
                List<Double> doubles = vLists.get(indexOfMax);
                objMap.get(key).getValues()[2] = doubles.get(0);
                objMap.get(key).getValues()[3] = doubles.get(1);
            }
        }
//        log.info("objMap: {}", objMap);
        out.collect(new ArrayList<>(objMap.values()));
    }

}
